package com.huan.hadoop.serializable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce操作
 * 手机号作为key， PhoneFlow对象作为值
 *
 * @author huan.fu
 * @date 2023/7/10 - 19:40
 */
public class PhoneFlowReducer extends Reducer<PhoneFlow, PhoneFlow, Text, PhoneFlow> {

    /**
     * 需要输出的数据
     */
    private final PhoneFlow outValue = new PhoneFlow();
    /**
     * 需要输出的key
     */
    private final Text outKey = new Text();

    @Override
    protected void reduce(PhoneFlow key, Iterable<PhoneFlow> values, Reducer<PhoneFlow, PhoneFlow, Text, PhoneFlow>.Context context) throws IOException, InterruptedException {

        // 总的上行流量
        long totalUpFlow = 0L;
        // 总的下行流程
        long totalDownFlow = 0L;
        // 总的流量
        long totalSumFlow = 0L;

        // 计算
        for (PhoneFlow row : values) {
            totalUpFlow = totalUpFlow + row.getUpFlow();
            totalDownFlow = totalDownFlow + row.getDownFlow();
            totalSumFlow = totalSumFlow + row.getSumFlow();
        }

        // 设置值
        outValue.setUpFlow(totalUpFlow);
        outValue.setDownFlow(totalDownFlow);
        outValue.setSumFlow(totalSumFlow);

        outKey.set(key.getPhone());

        // 输出
        context.write(outKey, outValue);
    }
}
